package canal_client.kafka;

import canal.bean.RowData;
import canal_client.util.ConfigUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;


public class KafkaSender {

    private Properties kafkaProps = new Properties();
    private KafkaProducer<String, RowData> kafkaProducer;
    //配置kafka参数
    public KafkaSender() {
        kafkaProps.put("bootstrap.servers", ConfigUtil.kafkaBootstrap_servers_config());
        kafkaProps.put("acks", ConfigUtil.kafkaAcks());
        kafkaProps.put("retries", ConfigUtil.kafkaRetries());
        kafkaProps.put("batch.size", ConfigUtil.kafkaBatch_size_config());
        kafkaProps.put("key.serializer", ConfigUtil.kafkaKey_serializer_class_config());
        kafkaProps.put("value.serializer", ConfigUtil.kafkaValue_serializer_class_config());
        kafkaProducer = new KafkaProducer<String, RowData>(kafkaProps);
    }

    //发送信息
    public void send(RowData rowData) {
        String string = rowData.toString();
        System.out.println(string);
        kafkaProducer.send(new ProducerRecord<String, RowData>(ConfigUtil.kafkaTopic(), rowData));
}

}
